
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/filesystem.hpp>
#include <iostream>
#include <string>

#include "zmq_messenger.h"
#include "loghelper.h"
#include "bytes_buffer.hpp"
#include "heartbeat.h"
#include "app_common.h"
#include "message_handler.h"
#include "serialize.hpp"
#include "utility.hpp"
#include "messagebus.h"
using namespace RockLog;

MessageHandler::~MessageHandler()
{
    if (m_messenger)
        delete m_messenger;
}

void MessageHandler::init(const std::string &workerId)
{
    m_workerId = workerId;
    if (m_startHeartBeat)
        startHeartBeat(m_workerId);
    m_messenger = new ZMQMessenger(m_workerId);
    m_messenger->init();
    g_messagebus.attach<const char *, uint32_t>(kZmqRecvMSG, &MessageHandler::onRecvMsg, *this);
    
}

bool MessageHandler::publishVideoAndTracks(const std::string& tracksStr, const std::string& videoFile)
{
    boost::filesystem::path{videoFile}.filename().string();
    std::ifstream is(videoFile, std::ifstream::binary);   // open file
    std::ostringstream tmp;
    tmp << is.rdbuf();
    publishVideoAndTracks(tracksStr, boost::filesystem::path{videoFile}.filename().string(), tmp.str());
    return true;
}

bool MessageHandler::publishVideoAndTracks(const std::string& tracksStr, const std::string& videoName, const std::string& videoContent)
{
    VideoAndTracks_t t;
    t.tracksStr = tracksStr;
    t.videoName = videoName;
    t.videoData = videoContent;

    std::stringstream ss;
    serialize(ss, t.tracksStr);
    serialize(ss, t.videoName);
    serialize(ss, t.videoData);

    LOG(kInfo) << "tracksStr size: " << t.tracksStr.size();

    BytesBuffer buf(4 + ss.str().size());
    buf.append((char *)&kWorkerUploadVideoAndTracks_t, 4);
    buf.append(ss.str());
    m_messenger->publishMsg(buf.begin(), buf.readableBytes());
    LOG(kInfo) << "[MessageHandler] publish file: " << videoName;

    return true;
}

void MessageHandler::doCameraInfo(BytesBuffer &buf)
{
    CameraInfo_t t;
    std::istringstream is(buf.retrieveAllAsString());
    deserialize(is, t);
    LOG(kInfo) << "name:" << t.name << ",camera_id:" << t.camera_id
                << ",main_pixel_w:" << t.main_pixel_w << ",main_pixel_h:" << t.main_pixel_h;
    g_messagebus.sendMessage(kRecvCameraInfo, t);   // 发布
}


void MessageHandler::onRecvMsg(const char *msg, uint32_t len)
{
    LOG(kInfo) << "[onRecvMsg]";
    BytesBuffer buf;
    char recvId[5] = {0};
    uint32_t type; //

    buf.append(msg, len);
    buf.retrieve(recvId, 4);
    //if (m_workerId != std::string(recvId))
    //{
    //    LOG(kErr) << "invalid workerId：" << recvId;
    //    return;
    //}

    buf.retrieve((char *)&type, 4);
    switch (type)
    {
    case kJsonMessage:
        LOG(kInfo) << "receive json message";
        break;
    case kStringMessage:
        LOG(kInfo) << "receive string message";
        break;
    case kCameraInfo_t:
    {
        LOG(kInfo) << "receive CameraInfo_t";
        doCameraInfo(buf);
        break;
    }
    default:
        LOG(kInfo) << "type:" << type;
        break;
    }
}
